package com.gitee.aleenjava.aliyun.mqtt.service.client;

import com.gitee.aleenjava.aliyun.mqtt.config.MqttPublisherProperties;
import com.gitee.aleenjava.aliyun.mqtt.tools.Tools;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * @author cheng
 */
@Slf4j
public class MqttPublishClient {

    //public static Map<String, List<MqttClient>> mqttClientMap = new ConcurrentHashMap<>(16);

    public static List<MqttClient> mqttClientsList = new ArrayList<>(10);


    private final AtomicInteger currentDispatcherIndex = new AtomicInteger();

    private MqttClient mqttClient;


    private MqttConnectOptions mqttConnectOptions;


    public MqttPublishClient(MqttPublisherProperties mqttPublisherProperties) throws MqttException {
        Map<String, List<String>> clientIDMap = generateClientIds(mqttPublisherProperties);
        for (String group : mqttPublisherProperties.getGroupIds()) {
            log.info("group {} ", group);
            List<String> clientIdList = clientIDMap.get(group);
            for (String clientId : clientIdList) {
                log.info("clientId {}", clientId);
                String domainUrl = String.format("tcp://%s:%s", mqttPublisherProperties.getIp(), mqttPublisherProperties.getPort());
                mqttConnectOptions = new MqttConnectOptions();
                mqttConnectOptions.setAutomaticReconnect(true);
                mqttConnectOptions.setKeepAliveInterval(20);
                // 支持同时发送的消息数为1000. （默认值为10）
                mqttConnectOptions.setMaxInflight(1000);
                mqttConnectOptions.setConnectionTimeout(mqttPublisherProperties.getConnectionTimeout());
                mqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
                mqttConnectOptions.setUserName("Signature|" + mqttPublisherProperties.getAccessKey() + "|" + mqttPublisherProperties.getInstanceId());
                mqttConnectOptions.setPassword(Tools.macSignature(clientId, mqttPublisherProperties.getSecretKey()).toCharArray());

                mqttClient = new MqttClient(domainUrl, clientId, new MemoryPersistence());
                //客户端设置好发送超时时间，防止无限阻塞
                mqttClient.setTimeToWait(5000);
                mqttClient.connect(mqttConnectOptions);

                mqttClientsList.add(mqttClient);
            }
            //mqttClientMap.put(groupID,mqttClientsList);
        }
        log.info("create mqtt connection size :{} ", mqttClientsList.size());
    }

    /**
     * 按照topic 生成 clientid
     *
     * @param mqttPublisherProperties
     * @return
     */
    private Map<String, List<String>> generateClientIds(MqttPublisherProperties mqttPublisherProperties) {
        Map<String, List<String>> clientIDMap = new HashMap<>(16);
        for (String group : mqttPublisherProperties.getGroupIds()) {
            List<String> clientIdList = new ArrayList<>(10);
            String deviceCode = UUID.randomUUID().toString().replaceAll("-", "").substring(3, 13);
            for (int i = 0; i < mqttPublisherProperties.getClientIdNums(); i++) {
                String deviceId = deviceCode + (i + 10);
                clientIdList.add(group + "@@@" + deviceId);
            }
            clientIDMap.put(group, clientIdList);
        }
        return clientIDMap;
    }


    public MqttClient dispatchMqttClient() {
        int currentIndex = currentDispatcherIndex.incrementAndGet();
        if (currentIndex > mqttClientsList.size() - 1) {
            currentIndex = 0;
            currentDispatcherIndex.set(currentIndex);
        }
        return mqttClientsList.get(currentIndex);
    }

    /**
     * 发布消息
     *
     * @param topic
     * @param message
     * @throws MqttException
     */
    public void publish(String topic, MqttMessage message) throws MqttException {
        log.info("topic {} ,send message {}", topic, message);
        MqttClient mqttClient = dispatchMqttClient();
        mqttClient.publish(topic, message);
    }

}
